54ab622b5fbfed70e27caef4c00c854757b1b9ab,src/main/java/com/github/ddth/kafka/internal/KafkaConsumerWorker.java,KafkaConsumerWorker,run,#,36
Before Change
}
}
}
Thread.yield();
}
}
}
After Change
mm = it.next();
}
}
if (msgListeners.size() > 0 && mm != null) {
final String topic = mm.topic();
final int partition = mm.partition();
final long offset = mm.offset();
final byte[] key = mm.key();
final byte[] message = mm.message();
final CountDownLatch countDownLatch = new CountDownLatch(msgListeners.size());
for (final IKafkaMessageListener listerner : messageListerners) {
Thread t = new Thread("Kafka-Consumer-Delivery") {
public void run() {
try {
listerner.onMessage(topic, partition, offset, key, message);
} catch (Exception e) {
LOGGER.warn(e.getMessage(), e);
} finally {
countDownLatch.countDown();
}
}
};
t.start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
LOGGER.warn(e.getMessage(), e);
}
} else {
Thread.yield();
}
}
}